package gora.cascading.example2;

import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.Aggregator;
import cascading.operation.Function;
import cascading.operation.aggregator.Count;
import cascading.operation.regex.RegexGenerator;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.scheme.Scheme;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;

/**
 * line of text from a document file, 
 * parse it into words, then count the number of times each word appears.
 * 
 */
public final class WordCountBasic {

	/**
	   * Static method to count the number of times each word appears. This will be executed in Hadoop cluster
	   *
	   * @param input source of the file
	   * @param place where you want the count of words should be written
	   */
	
    public static void wordCount (String inputPath,String outputPath) {
    	
    // define source and sink Taps.
    Scheme sourceScheme = new TextLine( new Fields( "line" ) );
	Tap source = new Hfs( sourceScheme, inputPath );

	Scheme sinkScheme = new TextLine( new Fields( "word", "count" ) );
	Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE );

	// the 'head' of the pipe assembly
	Pipe assembly = new Pipe( "wordcount" );

	// For each input Tuple
	// parse out each word into a new Tuple with the field name "word"
	// regular expressions are optional in Cascading
	String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";
	Function function = new RegexGenerator( new Fields( "word" ), regex ) ;
	assembly = new Each( assembly, new Fields( "line" ), function );

	// group the Tuple stream by the "word" value
	assembly = new GroupBy( assembly, new Fields( "word" ) );

	// For every Tuple group
	// count the number of occurrences of "word" and store result in
	// a field named "count"
	Aggregator count = new Count( new Fields( "count" ) );
	assembly = new Every( assembly, count );

	// plan a new Flow from the assembly using the source and sink Taps
	FlowConnector flowConnector = new HadoopFlowConnector(  );
	Flow flow = flowConnector.connect( "word-count", source, sink, assembly );

	// execute the flow, block until complete
    flow.complete();
    }
}
